iT邦幫忙

2021 iThome 鐵人賽

DAY 11
0
Software Development

從零開始Reactive Programming- Spring系列 第 12

[Day 11] Reactive Programming - Reactor(Scheduler)

  • 分享至 

  • xImage
  •  

前言

Reactor 是concurrency-agnostic ,花了一點時間研究這個英文單字的意思,concurrency是我們熟悉的併發,agnostic是未知論者(認為神存在但不為人知也無法確認的),推測的意思是對於Reactor來說並不在意concurrency是存在或是不存在,取決於使用者的行為,希望有高手可以解釋得更精準,Reactor文件上給出的意思是他不會強迫使用併發(concurrency model), 讓開發者自己去決定,如果你需要使用concurrency,Reactor提供了Scheduler來幫助開發者。

Scheduler

在沒有特別設定的情況下,Flux & Mono並不是特別一個專門的緒(Thread)去處理,而是根據最後subscribe()的緒來決定的。在官方的範例中,在main裡面宣告Mono,另開一條Threadsubscribe,從印出的結果就可以看出實際上執行緒是根據subscribe的。

public static void main(String[] args) throws InterruptedException { 
    System.out.println(Thread.currentThread().getName()); 
    final Mono<String> mono = Mono.just("hello "); 
    Thread t = 
        new Thread( 
            () -> 
                mono.map(msg -> msg + "thread ") 
                    .subscribe(v -> 
                    System.out.println(
                        v + Thread.currentThread().getName())
                        )); 
    t.start(); 
    t.join(); 
    // main 
    // hello thread Thread-0 
  }

工具類別Schedulers提供了幾個靜態方法:

  • Schedulers.immediate():基本上不會用到這個方法,他不會做任何的操作,可以當成是null,有可能的使用場景是某個api需要傳入Schedulers,但你並不想要更換Thread,這時候就可以傳入Schedulers.immediate()
  • Schedulers.single():只有一條且重複使用的Thread
  • Schedulers.elastic():會彈性的增加Thread(無上限),適用於需要較長時間處理的任務(task),像是呼叫阻斷(blocking)的服務或是I/O,但可能會導致太多的Thread或是一些backpressure的問題,再推出Schedulers.boundedElastic()後就不建議使用(Deprecated)。
  • Schedulers.boundedElastic():就像是elastic(),只是加上了一些限制來避免產生過多Thread的問題,有一個worker pool,預設閒置60秒就會release Thread
  • Schedulers.parallel():適用於快速且non-blocking的任務,根據CPU來產生Thread的數量。
  • fromExecutorService(ExecutorService):如果以前有預先就存在的ExecutorService
    上面這幾種方法除了fromExecutorService,都是有一個共用(global),如果希望可以單獨使用可以利用Schedulers.newXXXX()的方式來新增。

有一些operator使用指定的Scheduler來執行,通常也會讓你透過傳入參數的方式來改變,像是Flux.interval(Duration.ofMillis(300)),每三百毫秒推送,從程式碼可以看出預設使用Schedulers.parallel(),如果想要自己指定也可以直接傳入指定的Scheduler

public static Flux<Long> interval(Duration period) {
 return interval(period, Schedulers.parallel());
}

VirtualTimeScheduler

除了以上的Scheduler之外,其實還隱藏一個VirtualTimeScheduler,屬於reactor.test.scheduler,有自己的一個虛擬時鐘來控制時間,隨需的增加,控制時間可能不夠精準,因為只能前進,不能讓時光倒流,這在需要時間流逝才可以測試的場景就會十分好用方便,而不需要真的去等待。

下面這個例子是有一個延遲10秒、每5秒會發送的Flux,正常的情況下是看不到任何結果的,因為main-thread一下子就結束了,根本還來不及。如果想要看到資料就要在main thread加上sleep來延遲時間,這邊第一個不是剛好十秒是因為會有一點時間差,這樣等待了約二十秒後就可以看到預期的結果。

  @Test
  void testDefaultScheduler() throws InterruptedException {
    List<Long> list = new ArrayList<>();
    Flux
        .interval(Duration.ofSeconds(10), Duration.ofSeconds(5))
        .take(3)
        .subscribe(list::add);

    Thread.sleep(10500);
    System.out.println(list);
    Thread.sleep(5000);
    System.out.println(list);
    Thread.sleep(5000);
    System.out.println(list);
    /*
    [0] 
    [0, 1] 
    [0, 1, 2]
    */
  }

這時候如果有VirtualTimeScheduler的幫助,手動加速時間,就可以在正常main跑完的同時,就可以看到結果,節省了不少時間。

  @Test
  void testVirtualTimeScheduler() throws InterruptedException {
    List<Long> list = new ArrayList<>();

    VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet();
    Flux
        .interval(Duration.ofSeconds(10), Duration.ofSeconds(5), scheduler)
        .take(3)
        .subscribe(list::add);

    scheduler.advanceTimeBy(Duration.ofSeconds(10));
    System.out.println(list);
    scheduler.advanceTimeBy(Duration.ofSeconds(5));
    System.out.println(list);
    scheduler.advanceTimeBy(Duration.ofSeconds(5));
    System.out.println(list);
    /* 
    [0] 
    [0, 1] 
    [0, 1, 2] 
    */
  }

結語

Reactor提供各式的方便的工具讓開發者可以根據使用情境來選擇,下一篇就來介紹到底Reactor是要如何去使用這些Scheduler。隨著文章往後開始比較複雜,如果有寫得不太好的地方希望可以留言討論,感謝!

資料來源


上一篇
[Day 10] Reactive Programming - Reactor (generate & create)
下一篇
[Day 12] Reactive Programming - Reactor(publishOn/subscribeOn)
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言